syntax = "proto3";

package tensorflow;

import "google/protobuf/any.proto";

option go_package = "github.com/tensorflow/tensorflow/tensorflow/go/core/protobuf/for_core_protos_go_proto";

// Represents a remote worker task, specified by job name and task id.
message CoordinatedTask {
  string job_name = 1;
  int32 task_id = 2;
}

// Represents the state of a remote worker
enum CoordinatedTaskState {
  // TASKSTATE_UNSPECIFIED is an invalid state such that indicates a bug.
  TASKSTATE_UNSPECIFIED = 0;
  // TASKSTATE_UNINITIALIZED is an agent-only state. While the agent is
  // disconnected, the service has no way of knowing if the task is
  // initialized/uninitialized.
  TASKSTATE_UNINITIALIZED = 1;
  TASKSTATE_DISCONNECTED = 2;
  TASKSTATE_CONNECTED = 3;
  TASKSTATE_ERROR = 4;
}

// Status payload for all coordination service errors.
// Note: an empty proto may be set if the error is triggered by the task's own
// agent calls (i.e. not propagated by the service from another remote task).
message CoordinationServiceError {
  // Removed fields which used to specify the error origin.
  reserved 1, 2;
  // If true, error is reported via the agent API by the user (and not an
  // internal service error).
  bool is_reported_error = 3;
  // Denotes which task hit the error. If unset, the error originated from the
  // same task that is processing this error.
  CoordinatedTask source_task = 4;
}

message CoordinatedTaskStateInfo {
  CoordinatedTask task = 1;
  CoordinatedTaskState state = 2;
  int32 error_code = 3;
  string error_message = 4;
  CoordinationServiceError error_payload = 5;
}

// Placeholder message to be extended by other runtimes' device representations.
message DeviceInfo {
  repeated google.protobuf.Any device = 1;
}

// Request and response messages for registering a task to the cluster leader.
// A task is uniquely represented by its `job_name`, `task_id` and
// `incarnation`. Leader responds with its `incarnation` to identify a leader
// process.
message RegisterTaskRequest {
  // Removed fields which used to specify the task.
  reserved 1, 2;
  fixed64 incarnation = 3;
  // Moved the field `local_device_attributes` from this request message to
  // WaitForAllTasksRequest defined below.
  reserved 4;
  CoordinatedTask source_task = 5;
}

message RegisterTaskResponse {
  fixed64 leader_incarnation = 1;
}

// Request and response messages for sending heartbeats.
message HeartbeatRequest {
  // Removed fields which used to specify the remote task.
  reserved 1, 2;
  fixed64 incarnation = 3;
  CoordinatedTask source_task = 4;
}

message HeartbeatResponse {
  fixed64 leader_incarnation = 1;
  // If there are failures in cluster, use additional metadata in response to
  // broadcast error code and message to other tasks.
}

// Request and response messages for waiting for all tasks.
message WaitForAllTasksRequest {
  // Removed fields which used to specify the remote task.
  reserved 1, 2;
  // Removed field that specifically used TF device info.
  reserved 3, 4;
  CoordinatedTask source_task = 5;
  // All local device attributes on the request sender;
  DeviceInfo device_info = 6;
}

message WaitForAllTasksResponse {
  fixed64 leader_incarnation = 1;
  // Removed field that specifically used TF device info.
  reserved 2, 3;
  // All devices in the cluster.
  DeviceInfo device_info = 4;
}

// Request and response messages for disconnecting a task from the service.
message ShutdownTaskRequest {
  CoordinatedTask source_task = 1;
}

message ShutdownTaskResponse {}

// Request and response messages for resetting a task state in the service.
message ResetTaskRequest {
  CoordinatedTask source_task = 1;
}

message ResetTaskResponse {}

// Request and response messages for reporting errors to task.
message ReportErrorToTaskRequest {
  int32 error_code = 1;
  string error_message = 2;
  // Removed fields that are embedded in payload.
  reserved 3, 4;
  CoordinationServiceError error_payload = 5;
}

message ReportErrorToTaskResponse {}

// Request and response messages for reporting errors to service instance.
message ReportErrorToServiceRequest {
  int32 error_code = 1;
  string error_message = 2;
  // Removed fields which used to specify the error origin.
  reserved 3, 4;
  CoordinatedTask error_origin = 5;
}

message ReportErrorToServiceResponse {}

// Request and response messages for getting state of a remote task.
message GetTaskStateRequest {
  repeated CoordinatedTask source_task = 1;
}

message GetTaskStateResponse {
  repeated CoordinatedTaskStateInfo task_state = 1;
}

// Message for configuration key value.
// Key is structured like Unix file system, with multiple levels of directory
// names separated by the slash ('/') characters.
message KeyValueEntry {
  string key = 1;
  bytes value = 2;
}

// Request and response messages for inserting configuration key-value data.
message InsertKeyValueRequest {
  KeyValueEntry kv = 1;
}

message InsertKeyValueResponse {}

// Request and response messages for getting configuration key-value data.
message GetKeyValueRequest {
  string key = 1;
}

message GetKeyValueResponse {
  KeyValueEntry kv = 1;
}

message TryGetKeyValueRequest {
  string key = 1;
}

message TryGetKeyValueResponse {
  KeyValueEntry kv = 1;
}

message GetKeyValueDirRequest {
  string directory_key = 1;
}

message GetKeyValueDirResponse {
  string directory_key = 1;
  repeated KeyValueEntry kv = 2;
}

// Request and response messages for deleting configuration key-value data.
// When is_directory is true, delete key-values recursively under `key`.
message DeleteKeyValueRequest {
  string key = 1;
  bool is_directory = 2;
}

message DeleteKeyValueResponse {}

// Request and response messages for generic sync barriers.
message BarrierRequest {
  string barrier_id = 1;
  int64 barrier_timeout_in_ms = 2;
  // Denotes list of tasks that will wait for the barrier. If unspecified, it
  // implies that the entire cluster is participating in the barrier.
  repeated CoordinatedTask tasks = 3;
  // Task that is making the request.
  CoordinatedTask source_task = 4;
}

message BarrierResponse {}

// Request and response messages for  cancelling generic sync barriers.
message CancelBarrierRequest {
  string barrier_id = 1;
  // Task that is making the request.
  CoordinatedTask source_task = 2;
}

message CancelBarrierResponse {}

// Coordination Service defines a TensorFlow service that controls and
// coordinates distributed execution in a cluster of multiple tasks.
//
// The service keeps track of the cluster configuration and the state of cluster
// members or the leader depending on the role of the current task. The
// distributed runtime leverages this service to coordinate and perform cluster
// initialization, check the healthiness of tasks, and propagate error
// messages to the cluster.
service CoordinationService {
  // Register task to coordination service so that the service starts to track
  // liveness of the task. RPC blocks and returns only when it registers to
  // the service successfully, or error happens in the registering process.
  rpc RegisterTask(RegisterTaskRequest) returns (RegisterTaskResponse);

  // Heartbeat message from task to coordination service. Heartbeat is sent from
  // a task to refresh its timestamp on leader to avoid it becoming stale.
  // RPC responds immediately after refreshing the timestamp on leader.
  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);

  // Wait for all tasks in the cluster to be up and running. The RPC request
  // only gets responded when all tasks have registered, or some error occurs.
  rpc WaitForAllTasks(WaitForAllTasksRequest) returns (WaitForAllTasksResponse);

  // Disconnects task from the service. If `shutdown_barrier_timeout_in_ms` is
  // specified in the config, blocks until all tasks reach the barrier before
  // disconnecting together. If the barrier times out, tasks at the barrier will
  // still disconnect, while an error is reported to tasks that did not reach
  // the barrier on time.
  rpc ShutdownTask(ShutdownTaskRequest) returns (ShutdownTaskResponse);

  // Disconnects task from the service if it is in an ERROR state, thereby
  // allowing it to reconnect via RegisterTask() in the future.
  rpc ResetTask(ResetTaskRequest) returns (ResetTaskResponse);

  // Report error to the task. RPC sets the receiving instance of coordination
  // service agent to error state permanently.
  // TODO(b/195990880): Consider splitting this into a different RPC service.
  rpc ReportErrorToTask(ReportErrorToTaskRequest)
      returns (ReportErrorToTaskResponse);

  // Report task error to coordination service. RPC sets the service-side task
  // state to error, and propagate the error to other tasks in the cluster.
  rpc ReportErrorToService(ReportErrorToServiceRequest)
      returns (ReportErrorToServiceResponse);

  // Get the state of a remote task. Specifically, RPC returns a
  // CoordinatedTaskState, and if the task is in an error status, returns a
  // non-OK error code, non-empty error message and error payload.
  rpc GetTaskState(GetTaskStateRequest) returns (GetTaskStateResponse);

  // Insert configuration key-value that will be accessible to all cluster
  // tasks. The key can be formatted as Unix file path with hierarchy. The
  // coordination service key-value store should only be used for cluster
  // configuration data.
  rpc InsertKeyValue(InsertKeyValueRequest) returns (InsertKeyValueResponse);

  // Get configuration key-value. The request blocks until the key-value data
  // becomes available (i.e., set by a task in the cluster).
  rpc GetKeyValue(GetKeyValueRequest) returns (GetKeyValueResponse);

  // Get configuration key-value. The request does not block, but returns an
  // error if the requested key does not exist.
  rpc TryGetKeyValue(TryGetKeyValueRequest) returns (TryGetKeyValueResponse);

  // Same as GetKeyValue, but returns all values that have keys which are
  // prefixed with the directory key.
  rpc GetKeyValueDir(GetKeyValueDirRequest) returns (GetKeyValueDirResponse);

  // Delete configuration key-value. If is_directory is set in request,
  // recursively clean up all key-values under the path specified by `key`.
  rpc DeleteKeyValue(DeleteKeyValueRequest) returns (DeleteKeyValueResponse);

  // Blocks until all (or a subset of) tasks are at the barrier or the barrier
  // fails.
  //
  // `barrier_id` should be unique across barriers. Once the barrier has passed
  // or failed, subsequent calls will not block, and immediately respond with
  // the previous response.
  //
  // The first WaitAtBarrier() call received by the service for a particular
  // barrier id is special in that it determines the barrier deadline based on
  // timeout duration.
  // However, if subsequent calls by different agents specify a different set of
  // `tasks` for the same `barrier_id`, the barrier will fail instantly.
  //
  // If no tasks are specified (default), the barrier will block for all the
  // connected tasks.
  //
  // Possible service errors:
  //   - DeadlineExceeded: Timed out waiting for specified tasks at the barrier.
  //      Deadline is determined by the server timestamp when it receives the
  //      first WaitAtBarrier() + timeout duration.
  //   - Cancelled: One of the tasks called CancelBarrier().
  //   - Aborted: Service is shutting down.
  //   - Internal: Any participating task is in ERROR state.
  //   - InvalidArgument: (1) Conflicting tasks specified by different agents
  //       for the same barrier, (2) one of the participating tasks is not in
  //       the cluster, or (3) task making the request is not included in the
  //       list of participating tasks.
  rpc Barrier(BarrierRequest) returns (BarrierResponse);

  // Aborts the barrier if it is ongoing.
  // Current and future WaitAtBarrier() calls with the same id will return a
  // CANCELLED error status.
  // Possible service errors:
  //   - FailedPrecondition: Barrier has already been passed.
  rpc CancelBarrier(CancelBarrierRequest) returns (CancelBarrierResponse);
}
